# This code calculates the percent moved across zip/county/state in the past 1 and 5 years.


# Load packages

import pyspark.sql.functions as sqlf
import pandas as pd
import numpy  as np
import os
import sys
import shutil
from pyspark     import SparkContext, SparkConf
from pyspark.sql import SQLContext
from SparkDataTools2 import *


# Set spark working path
WORKDIR = '/geo_debt/geo/Spark'
CODEDIR = '/geo_debt/geo/Code/1_Spark/'
OUTDIR  = '/geo_debt/geo/Output/Table'


#cross walk
CW      = 'parquet.`/geo_debt/geo/cw/cw_zip_cz`'

#go to owrking directory
os.chdir(WORKDIR)

#########################
###       Main        ###
#########################

yList = [2005, 2015]
m   = 9
geo_level = 'cz'

for y in yList:

    headersData = table_setup(sqlContext, y, m, 'headers')
    headersData.createOrReplaceTempView("headersData")

    tradesData  = table_setup(sqlContext, y, m, 'trades')
    tradesData.createOrReplaceTempView("tradesData")

    customsData = table_setup(sqlContext, y, m, 'customs')
    customsData.createOrReplaceTempView("customsData")

    ym = yyyymm(y,m)
    print("^^^ "+ym+" ^^^")

    if y>=2009:
        headersData = table_setup(sqlContext, y, m, 'headers')
        headersData.createOrReplaceTempView("headersData")

        base = "SELECT subjectKey, zip FROM headersData WHERE WHERE birthDate IS NOT NULL AND FLOOR((asOfDate - birthDate)/10000) BETWEEN 20 AND 80 "
    else:
        customsData = table_setup(sqlContext, y, m, 'customs')
        customsData.createOrReplaceTempView("customsData")

        base = "SELECT subjectKey, zipcode as zip FROM customsData "

    if y-1>=2009:
        headersData01 = table_setup(sqlContext, y-1, m, 'headers')
        headersData01.createOrReplaceTempView("headersData01")

        p1 = "SELECT subjectKey, zip FROM headersData01 WHERE WHERE birthDate IS NOT NULL AND FLOOR((asOfDate - birthDate)/10000) BETWEEN 20 AND 80 "
    else:
        customsData01 = table_setup(sqlContext, y-1, m, 'customs')
        customsData01.createOrReplaceTempView("customsData01")

        p1 = "SELECT subjectKey, zipcode as zip FROM customsData01 "

    if y-5>=2009:
        headersData050 = table_setup(sqlContext, y-5, m, 'headers')
        headersData050.createOrReplaceTempView("headersData050")

        p5 = "SELECT subjectKey, zip FROM headersData050 WHERE WHERE birthDate IS NOT NULL AND FLOOR((asOfDate - birthDate)/10000) BETWEEN 20 AND 80 "
    else:
        customsData05 = table_setup(sqlContext, y-5, m, 'customs')
        customsData05.createOrReplaceTempView("customsData05")

        p5 = "SELECT subjectKey, zipcode as zip FROM customsData05 "


    ##Left JOIN all three years
    sql_allthreeyears = "SELECT a.subjectKey, a.zip, "+\
                        "p1.zip AS zipm1, p5.zip AS zipm5 "+\
                        "FROM (" + base + ") a "+\
                        "LEFT JOIN (" + p1 + ") AS p1 ON a.subjectKey=p1.subjectKey "+\
                        "LEFT JOIN (" + p5 + ") AS p5 ON a.subjectKey=p5.subjectKey "

    sql_2 = "SELECT a.subjectKey, a.zip, a.zipm1, a.zipm5, "+\
            "cw.county, cw.statefip, "+\
            "cwp1.county AS countym1, cwp1.statefip AS statefipm1, "+\
            "cwp5.county AS countym5, cwp5.statefip AS statefipm5 "+\
            "FROM (" + sql_allthreeyears +") a "+\
            "INNER JOIN (" + CW + ") cw ON a.zip=cw.zip " +\
            "LEFT JOIN (" + CW + ") cwp1 ON a.zipm1=cwp1.zip " +\
            "LEFT JOIN (" + CW + ") cwp5 ON a.zipm5=cwp5.zip "

    sql_ind = "SELECT " +\
              "CASE WHEN zipm1 IS NULL THEN NULL WHEN zip!=zipm1 THEN 1 ELSE 0 END AS move_zip_1, " +\
              "CASE WHEN zipm5 IS NULL THEN NULL WHEN zip!=zipm5 THEN 1 ELSE 0 END AS move_zip_5, " +\
              "CASE WHEN countym1 IS NULL THEN NULL WHEN county!=countym1 THEN 1 ELSE 0 END AS move_county_1, " +\
              "CASE WHEN countym5 IS NULL THEN NULL WHEN county!=countym5 THEN 1 ELSE 0 END AS move_county_5, " +\
              "CASE WHEN statefipm1 IS NULL THEN NULL WHEN statefip!=statefipm1 THEN 1 ELSE 0 END AS move_statefip_1, " +\
              "CASE WHEN statefipm5 IS NULL THEN NULL WHEN statefip!=statefipm5 THEN 1 ELSE 0 END AS move_statefip_5 " +\
              "FROM (" + sql_2 + ") "

    sql = "SELECT AVG(move_zip_1) AS move_zip_1, AVG(move_zip_5) AS move_zip_5, "+\
          "AVG(move_county_1) AS move_county_1, AVG(move_county_5) AS move_county_5, "+\
          "AVG(move_statefip_1) AS move_statefip_1, AVG(move_statefip_5) AS move_statefip_5 "+\
          "FROM (" + sql_ind + ") "

    # sqlContext.sql("SELECT COUNT(*) FROM ("+sql_2+")").show()
    saveStata(sql,'moving_'+ym, WORKDIR, OUTDIR)

sqlContext.clearCache()

print(' !!!! SUCCESS !!!!')
